package com.atguigu.app.dim;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimCreateTableRichMapFunction;
import com.atguigu.app.func.DimSinkFunction;
import com.atguigu.app.func.DimTableProcessFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.Constant;
import com.atguigu.utils.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class DimApp {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(10000L);
//        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//        checkpointConfig.setCheckpointTimeout(20000L);
//        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/flink-ck");
//        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        //checkpointConfig.setCheckpointInterval(10000L);
//        checkpointConfig.setMinPauseBetweenCheckpoints(5000L);
//        checkpointConfig.setMaxConcurrentCheckpoints(2);
//        //默认是int类型的最大值
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//        env.setStateBackend(new HashMapStateBackend());
//
//        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //2.读取Kafka ODS层 topic_db主题数据
        DataStreamSource<String> kafkaDS = env.fromSource(KafkaUtil.getKafkaSource(Constant.TOPIC_ODS_DB, "dim_app_230315"), WatermarkStrategy.noWatermarks(), "kafka-source");

        //3.将数据转换为JSON对象同时过滤脏数据  主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (!"".equals(value)) {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    } catch (JSONException e) {
                        System.out.println("脏数据：" + value);
                    }
                }
            }
        });

        jsonObjDS.print("jsonObjDS>>>>");

        //4.使用FlinkCDC读取MySQL中的配置信息表创建配置流
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username("root")
                .password("000000")
                .databaseList("gmall-230315-config")
                .tableList("gmall-230315-config.table_process")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source");

        //4.1 建表
        SingleOutputStreamOperator<TableProcess> tableProcessDS = mysqlDS
                .map(new DimCreateTableRichMapFunction());

        //5.将配置流转换为广播流
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("bc-state", String.class, TableProcess.class);
        BroadcastStream<TableProcess> broadcastDS = tableProcessDS.broadcast(mapStateDescriptor);

        //6.连接主流与广播流
        BroadcastConnectedStream<JSONObject, TableProcess> connectedStream = jsonObjDS.connect(broadcastDS);

        //7.根据广播流的广播状态处理主流数据
        SingleOutputStreamOperator<JSONObject> hbaseDS = connectedStream.process(new DimTableProcessFunction(mapStateDescriptor));

        //8.将数据写出到HBase
        hbaseDS.print("hbase>>>>");
        hbaseDS.addSink(new DimSinkFunction());

        //9.启动任务
        env.execute("DimApp");

    }
}
